MODULE mo_transpose ! !+ $Id: mo_transpose.f90,v 1.33 1999/09/28 10:09:19 m214030 Exp $ ! 5: ! this module holds the global distribution and transposition routines: ! global distribution routines (global <-> pe) ! transposition routines (pe <-> pe) ! ! Authors: 10: ! ! A. Rhodin, MPI, August 1999, original source ! USE mo_exception, ONLY: finish ! abort in case of errors USE mo_doctor, ONLY: nerr ! stderr output unit 15: USE mo_mpi, ONLY: p_send, &! MPI_send routine p_recv, &! MPI_recv routine p_bcast, &! MPI_bcast routine p_pe, &! this processor p_nprocs, &! number of processors 20: p_io ! processor which performs I/O USE mo_decomposition, ONLY: pe_decomposed, &! decomposition table data type debug_parallel,&! debug flag dc=>local_decomposition 25: IMPLICIT NONE PRIVATE ! ! public routines: ! 30: PUBLIC :: indx ! get index within decomposition table from processor id ! ! scatter : global arrays -> local arrays ! PUBLIC :: scatter_gp ! global grid point field -> local pe's 35: PUBLIC :: scatter_ls ! global spectral field -> Legendre space PUBLIC :: scatter_sp ! global spectral field -> local pe's PUBLIC :: scatter_sa ! global Fourier sym/asym -> local pe's ! ! gather : global arrays <- local arrays 40: ! PUBLIC :: gather_gp ! global grid point field <- local pe's PUBLIC :: gather_gp3 ! global grid point field <- local pe's (nlon,nlev,nlat) PUBLIC :: gather_ls ! global spectral field <- Legendre space PUBLIC :: gather_sp ! global spectral field <- local pe's 45: PUBLIC :: gather_sa ! global Fourier sym/asym <- local pe's ! ! transpositions: ! PUBLIC :: tr_gp_fs ! transpose grid point space <-> Fourier space 50: PUBLIC :: tr_fs_ls ! transpose Fourier space <-> Legendre space PUBLIC :: tr_ls_sp ! transpose Legendre space <-> spectral space ! ! public constants ! 55: PUBLIC :: tag_gather_gp ! ! interfaces (specific routines) ! 60: ! Gridpoint space ! INTERFACE gather_gp MODULE PROCEDURE gather_gp432 ! gather gridp. field (nlon,nlev,ntrac,nlat) ! or (nlon,nlev,nlat,1) 65: ! or (nlon,nlat) MODULE PROCEDURE gather_gp32 ! gather gridp. field (nlon,nlev,nlat) ! or (nlon,nlat,1) MODULE PROCEDURE gather_gp2 ! gather only m=0 wave number (nlon,nlat) END INTERFACE 70: INTERFACE scatter_gp MODULE PROCEDURE scatter_gp432! scatter gridp. field (nlon,nlev,ntrac,nlat) ! or (nlon,nlev,nlat,1) ! or (nlon,nlat) 75: MODULE PROCEDURE scatter_gp32 ! scatter gridp. field (nlon,nlev,nlat) ! or (nlon,nlat,1) END INTERFACE ! ! Legendre space 80: ! INTERFACE scatter_ls MODULE PROCEDURE scatter_ls3 ! scatter full spectral field (nlev, 2, nsp) MODULE PROCEDURE scatter_ls0 ! scatter only m=0 wave number (nlev, nnp1) END INTERFACE 85: INTERFACE gather_ls MODULE PROCEDURE gather_ls3 ! gather full spectral field (nlev, 2, nsp) MODULE PROCEDURE gather_ls0 ! gather only m=0 wave number (nlev, nnp1) MODULE PROCEDURE gather_ls4 ! spectral fields (nmp1*2,nlevp1,nlat,nvar) 90: END INTERFACE ! ! symetric and asymetric forier components ! decomposition in legendre space ! 95: INTERFACE scatter_sa MODULE PROCEDURE scatter_sa42 ! sym/asym components (nlev,2,nmp1,nhgl) ! or (nlev,nhgl,1,1) MODULE PROCEDURE scatter_sa2 ! sym/asym components (nlev,nhgl) END INTERFACE 100: INTERFACE gather_sa MODULE PROCEDURE gather_sa42 ! sym/asym components (nlev,2,nmp1,nhgl) ! or (nlev,nhgl,1,1) MODULE PROCEDURE gather_sa2 ! sym/asym components (nlev,nhgl) 105: END INTERFACE ! ! spectral space ! INTERFACE scatter_sp 110: MODULE PROCEDURE scatter_sp4 ! scatter spectral field (nlev, 2, nsp,1) MODULE PROCEDURE scatter_sp3 ! scatter spectral field (nlev, 2, nsp) MODULE PROCEDURE scatter_sp0 ! scatter only m=0 coeff. (nlev, nnp1) END INTERFACE 115: INTERFACE gather_sp MODULE PROCEDURE gather_sp4 ! scatter full spectral field (nlev, 2, nsp,1) MODULE PROCEDURE gather_sp3 ! scatter full spectral field (nlev, 2, nsp) MODULE PROCEDURE gather_sp0 ! scatter only m=0 wave number (nlev, nnp1) END INTERFACE 120: INTERFACE indx MODULE PROCEDURE indx0 MODULE PROCEDURE indx2 END INTERFACE 125: ! ! define tags ! INTEGER, PARAMETER :: tag_scatter_gp = 100 INTEGER, PARAMETER :: tag_gather_gp = 101 130: INTEGER, PARAMETER :: tag_scatter_ls = 110 INTEGER, PARAMETER :: tag_gather_ls = 111 INTEGER, PARAMETER :: tag_scatter_sp = 120 INTEGER, PARAMETER :: tag_gather_sp = 121 INTEGER, PARAMETER :: tag_scatter_sa = 130 135: INTEGER, PARAMETER :: tag_gather_sa = 131 INTEGER, PARAMETER :: tag_tr_gp_fs = 200 INTEGER, PARAMETER :: tag_tr_fs_ls = 210 INTEGER, PARAMETER :: tag_tr_ls_sp = 220 !============================================================================== 140: CONTAINS !============================================================================== SUBROUTINE scatter_gp432 (gl, lc, gl_dc) ! ! scatter global grid point field from pe's (nlon,nlev,ntrac,nlat) 145: ! or (nlon,nlev,nlat ,1 ) ! or (nlon,nlat,1 ,1 ) ! REAL ,POINTER :: gl (:,:,:,:) ! global field REAL ,INTENT(out) :: lc (:,:,:,:) ! local field 150: TYPE (pe_decomposed) ,INTENT(in) :: gl_dc(0:) ! global decomposition INTEGER :: size4 ! size of 4rd dimension INTEGER :: i REAL ,POINTER :: gl3(:,:,:) ! 155: ! call 3D scatter routine if 4th dimension size is 1 ! else loop over 3th index ! IF (p_pe == p_io) size4 = (SIZE(gl,4)) CALL p_bcast (size4, p_io) 160: IF (size4 == 1) THEN gl3 => gl(:,:,:,1) CALL scatter_gp32 (gl3, lc(:,:,:,1), gl_dc) ELSE DO i=1,SIZE(lc,3) 165: gl3 => gl(:,:,i,:) CALL scatter_gp32 (gl3, lc(:,:,i,:), gl_dc) END DO ENDIF END SUBROUTINE scatter_gp432 170: !------------------------------------------------------------------------------ SUBROUTINE scatter_gp32 (gl, lc, gl_dc) ! ! send global grid point field to pe's (nlon,nlev,nlat) or (nlon,nlat,1) ! 175: REAL ,POINTER :: gl (:,:,:) ! global field REAL ,INTENT(out) :: lc (:,:,:) ! local field TYPE (pe_decomposed) ,INTENT(in) :: gl_dc(0:) ! global decomposition INTEGER :: size3 ! size of 3rd dimension REAL ,POINTER :: gl2(:,:) 180: ! ! call 2D scatter routine if 3rd dimension size is 1 ! else call 3D scatter routine ! IF (p_pe == p_io) size3 = (SIZE(gl,3)) 185: CALL p_bcast (size3, p_io) IF (size3 == 1) THEN NULLIFY(gl2) IF (p_pe == p_io) gl2 => gl(:,:,1) CALL scatter_gp2 (gl2, lc(:,:,1), gl_dc) 190: ELSE CALL scatter_gp3 (gl, lc, gl_dc) ENDIF END SUBROUTINE scatter_gp32 !------------------------------------------------------------------------------ 195: SUBROUTINE scatter_gp2 (gl, lc, gl_dc) ! ! send global 2D grid point field to local pe's (nlon,nlat) ! REAL ,POINTER :: gl (:,:) ! global field 200: REAL ,INTENT(out) :: lc (:,:) ! local field TYPE (pe_decomposed) ,INTENT(in) :: gl_dc(:) ! global decomposition ! ! Data structure: ! 205: ! global grid point field GL: (1:nlon (+2), 1:nlat) ! ! local grid point field LC: (1:nglon(+2), 1:nglat) ! ! The actual size of the first index may or may be not larger than 210: ! NLON or NGLON ! ! The local grid point field LC covers two distinct areas at opposite ! sides of the globe. The array elements correspond with the global ! field as follows: 215: ! ! LC (1:nglon,1:nglat/2 ) = GL (glons(1):glone(1),glats(1):glate(1)) ! LC (1:nglon,nglat/2+1:) = GL (glons(2):glone(2),glats(2):glate(2)) ! ! local variables 220: ! REAL ,ALLOCATABLE :: buf (:,:) ! buffer INTEGER :: imype ! index of this PE INTEGER :: i ! loop index INTEGER :: pe ! processor to communicate with 225: INTEGER :: nlon ! global number of longitudes INTEGER :: nglon ! local imype = indx (p_pe, gl_dc) nlon = gl_dc(imype)% nlon ! 230: ! send if pe = p_io ! IF (p_pe == p_io) THEN DO i = 1, p_nprocs pe = gl_dc(i)% pe 235: ALLOCATE (buf (gl_dc(i)% nglon, gl_dc(i)% nglat)) ! ! pack first segment ! buf(:,:gl_dc(i)% nglat/2) = & 240: gl (gl_dc(i)% glons(1) : gl_dc(i)% glone(1), & gl_dc(i)% glats(1) : gl_dc(i)% glate(1)) ! ! pack second segment ! 245: IF (gl_dc(i)% glone(2)>gl_dc(i)% glons(2)) THEN buf(:,gl_dc(i)% nglat/2+1:) = & gl (gl_dc(i)% glons(2) : gl_dc(i)% glone(2), & gl_dc(i)% glats(2) : gl_dc(i)% glate(2)) ELSE 250: ! ! pack second segment, split in longitudes ! buf(:nlon-gl_dc(i)% glons(2)+1,gl_dc(i)% nglat/2+1:) = & gl (gl_dc(i)% glons(2) : nlon, & 255: gl_dc(i)% glats(2) : gl_dc(i)% glate(2)) buf(gl_dc(i)% nglon-gl_dc(i)% glone(2)+1:, & gl_dc(i)% nglat/2+1:) = & gl (1: gl_dc(i)% glone(2), & gl_dc(i)% glats(2) : gl_dc(i)% glate(2)) 260: ENDIF ! ! send ! IF (pe /= p_pe) THEN 265: CALL p_send( buf, pe, tag_scatter_gp) ELSE nglon = gl_dc(i)% nglon lc( :nglon,:) = buf lc(nglon+1: ,:) = 0. 270: ENDIF DEALLOCATE (buf) END DO ELSE ! 275: ! receive if (p_io /= p_pe) ! nglon = gl_dc(imype)% nglon CALL p_recv (lc(:nglon,:), p_io, tag_scatter_gp) lc(nglon+1:,:) = 0. 280: END IF END SUBROUTINE scatter_gp2 !------------------------------------------------------------------------------ SUBROUTINE scatter_gp3 (gl, lc, gl_dc) ! 285: ! send global 3D grid point field to local pe's (nlon,nlev,nlat) ! REAL ,POINTER :: gl (:,:,:) ! global field REAL ,INTENT(out) :: lc (:,:,:) ! local field TYPE (pe_decomposed) ,INTENT(in) :: gl_dc(:) ! global decomposition 290: ! ! Data structure: ! ! global grid point field GL: (1:nlon (+2), 1:nlev, 1:nlat) ! 295: ! local grid point field LC: (1:nglon(+2), 1:nlev, 1:nglat) ! ! The actual size of the first index may or may be not larger than ! NLON or NGLON ! 300: ! The local grid point field LC covers two distinct areas at opposite ! sides of the globe. The array elements correspond with the global ! field as follows: ! ! LC (1:nglon,:,1:nglat/2 ) = GL (glons(1):glone(1),:,glats(1):glate(1)) 305: ! LC (1:nglon,:,nglat/2+1:) = GL (glons(2):glone(2),:,glats(2):glate(2)) ! ! local variables ! REAL ,ALLOCATABLE :: buf (:,:,:) ! buffer 310: INTEGER :: i ! loop index INTEGER :: pe ! processor to communicate with INTEGER :: nlon ! global number of longitudes INTEGER :: nglon ! local INTEGER :: imype ! index of this pe 315: imype = indx (p_pe, gl_dc) nlon = gl_dc(1)% nlon ! ! send if pe = p_io ! 320: IF (p_pe == p_io) THEN DO i = 1, p_nprocs pe = gl_dc(i)% pe ALLOCATE (buf (gl_dc(i)% nglon, SIZE(gl,2), gl_dc(i)% nglat)) ! 325: ! pack first segment ! buf(:,:,:gl_dc(i)% nglat/2) = & gl (gl_dc(i)% glons(1) : gl_dc(i)% glone(1), : , & gl_dc(i)% glats(1) : gl_dc(i)% glate(1)) 330: ! ! pack second segment ! IF (gl_dc(i)% glone(2)>gl_dc(i)% glons(2)) THEN buf(:,:,gl_dc(i)% nglat/2+1:) = & 335: gl (gl_dc(i)% glons(2) : gl_dc(i)% glone(2), : , & gl_dc(i)% glats(2) : gl_dc(i)% glate(2)) ELSE ! ! pack second segment, split in longitudes 340: ! buf(:nlon-gl_dc(i)% glons(2)+1,:,gl_dc(i)% nglat/2+1:) = & gl (gl_dc(i)% glons(2) : nlon, : , & gl_dc(i)% glats(2) : gl_dc(i)% glate(2)) buf(gl_dc(i)% nglon-gl_dc(i)% glone(2)+1:,:,& 345: gl_dc(i)% nglat/2+1:) = & gl (1: gl_dc(i)% glone(2), : , & gl_dc(i)% glats(2) : gl_dc(i)% glate(2)) ENDIF ! 350: ! send ! IF (pe /= p_pe) THEN CALL p_send( buf, pe, tag_scatter_gp) ELSE 355: nglon = gl_dc(i)% nglon lc( :nglon,:,:) = buf lc(nglon+1: ,:,:) = 0. ENDIF DEALLOCATE (buf) 360: END DO ELSE ! ! receive if (p_io /= p_pe) ! 365: nglon = gl_dc(imype)% nglon CALL p_recv (lc(:nglon,:,:), p_io, tag_scatter_gp) lc(nglon+1:,:,:) = 0. END IF END SUBROUTINE scatter_gp3 370: !============================================================================== SUBROUTINE gather_gp432 (gl, lc, gl_dc, source) ! ! gather global grid point field from pe's (nlon,nlev,ntrac,nlat) ! or (nlon,nlev,nlat ,1 ) 375: ! or (nlon,nlat,1 ,1 ) ! REAL ,POINTER :: gl (:,:,:,:) ! global field REAL ,INTENT(in) :: lc (:,:,:,:) ! local field TYPE (pe_decomposed) ,INTENT(in) :: gl_dc(0:) ! global decomposition 380: INTEGER, OPTIONAL ,INTENT(in) :: source ! source to gather from ! ! -1=all;0=p_io;1=not p_io INTEGER :: size4 ! size of 4rd dimension INTEGER :: i REAL ,POINTER :: gl3(:,:,:) 385: ! ! call 2D gather routine if 4th dimension size is 1 ! else loop over 3th index ! IF (p_pe == p_io) size4 = (SIZE(gl,4)) 390: CALL p_bcast (size4, p_io) IF (size4 == 1) THEN gl3 => gl(:,:,:,1) CALL gather_gp32 (gl3, lc(:,:,:,1), gl_dc, source) ELSE 395: DO i=1,SIZE(lc,3) gl3 => gl(:,:,i,:) CALL gather_gp32 (gl3, lc(:,:,i,:), gl_dc, source) END DO ENDIF 400: END SUBROUTINE gather_gp432 !------------------------------------------------------------------------------ SUBROUTINE gather_gp32 (gl, lc, gl_dc, source) ! ! gather global grid point field from pe's (nlon,nlev,nlat) or (nlon,nlat,1) 405: ! REAL ,POINTER :: gl (:,:,:) ! global field REAL ,INTENT(in) :: lc (:,:,:) ! local field TYPE (pe_decomposed) ,INTENT(in) :: gl_dc(0:) ! global decomposition INTEGER, OPTIONAL ,INTENT(in) :: source ! source to gather from 410: ! ! -1=all;0=p_io;1=not p_io INTEGER :: size3 ! size of 3rd dimension REAL ,POINTER :: gl2(:,:) ! ! call 2D gather routine if 3rd dimension size is 1 415: ! else call 3D gather routine ! IF (p_pe == p_io) size3 = (SIZE(gl,3)) CALL p_bcast (size3, p_io) IF (size3 == 1) THEN 420: gl2 => gl(:,:,1) CALL gather_gp2 (gl2, lc(:,:,1), gl_dc, source) ELSE CALL gather_gp3 (gl, lc, gl_dc, source) ENDIF 425: END SUBROUTINE gather_gp32 !------------------------------------------------------------------------------ SUBROUTINE gather_gp2 (gl, lc, gl_dc, source) ! ! receive global grid point field from local pe's (nlon,nlat) 430: ! REAL ,POINTER :: gl (:,:) ! global field REAL ,INTENT(in) :: lc (:,:) ! local field TYPE (pe_decomposed) ,INTENT(in) :: gl_dc(:) ! global decomposition INTEGER, OPTIONAL ,INTENT(in) :: source ! source to gather from 435: ! ! -1=all;0=p_io;1=not p_io ! Data structure: As described in scatter_gp ! ! local variables ! 440: REAL ,ALLOCATABLE :: buf (:,:) ! buffer INTEGER :: i ! loop index INTEGER :: pe ! processor to communicate with INTEGER :: nlon ! global number of longitudes INTEGER :: nglon ! local number of longitudes 445: INTEGER :: imype ! index of this pe INTEGER :: src ! source src = debug_parallel IF (debug_parallel >= 0 .AND. PRESENT(source)) src = source imype = indx (p_pe, gl_dc) 450: nlon = gl_dc(1)% nlon ! ! send if pe /= p_io ! IF (p_pe /= p_io) THEN 455: nglon = gl_dc(imype)% nglon CALL p_send (lc(:nglon,:), p_io, tag_gather_gp) ELSE DO i = 1, p_nprocs pe = gl_dc(i)% pe 460: nglon = gl_dc(i)% nglon ALLOCATE (buf (nglon, gl_dc(i)% nglat)) ! ! receive ! 465: IF (pe /= p_pe) THEN CALL p_recv( buf, pe, tag_gather_gp) ELSE buf = lc(:nglon,:) ENDIF 470: IF(src==-1 .OR. (src==0 .AND. p_io==pe) & .OR. (src==1 .AND. p_io/=pe)) THEN ! ! unpack first segment ! 475: gl (gl_dc(i)% glons(1) : gl_dc(i)% glone(1), & gl_dc(i)% glats(1) : gl_dc(i)% glate(1)) = & buf(:,:gl_dc(i)% nglat/2) ! ! unpack second segment 480: ! IF (gl_dc(i)% glone(2)>gl_dc(i)% glons(2)) THEN gl (gl_dc(i)% glons(2) : gl_dc(i)% glone(2), & gl_dc(i)% glats(2) : gl_dc(i)% glate(2)) = & buf(:,gl_dc(i)% nglat/2+1:) 485: ELSE ! ! unpack second segment, split in longitudes ! gl (gl_dc(i)% glons(2) : nlon, & 490: gl_dc(i)% glats(2) : gl_dc(i)% glate(2)) = & buf(:nlon-gl_dc(i)% glons(2)+1,gl_dc(i)% nglat/2+1:) gl (1: gl_dc(i)% glone(2), & gl_dc(i)% glats(2) : gl_dc(i)% glate(2)) = & buf(gl_dc(i)% nglon-gl_dc(i)% glone(2)+1:, & 495: gl_dc(i)% nglat/2+1:) ENDIF ENDIF DEALLOCATE (buf) END DO 500: ! ! set elements with i>nlon to zero ! gl (nlon+1:,:) = 0. ENDIF 505: END SUBROUTINE gather_gp2 !------------------------------------------------------------------------------ SUBROUTINE gather_gp3 (gl, lc, gl_dc, source) ! ! receive global grid point field from local pe's (nlon,nlev,nlat) 510: ! REAL ,POINTER :: gl (:,:,:) ! global field REAL ,INTENT(in) :: lc (:,:,:) ! local field TYPE (pe_decomposed) ,INTENT(in) :: gl_dc(:) ! global decomposition INTEGER, OPTIONAL ,INTENT(in) :: source ! source to gather from 515: ! ! -1=all;0=p_io;1=not p_io ! Data structure: As described in scatter_gp ! ! local variables ! 520: REAL ,ALLOCATABLE :: buf (:,:,:) ! buffer INTEGER :: i ! loop index INTEGER :: pe ! processor to communicate with INTEGER :: nlon ! global number of longitudes INTEGER :: nglon ! local number of longitudes 525: INTEGER :: imype ! index of this pe INTEGER :: src ! source src = debug_parallel IF (debug_parallel >= 0 .AND. PRESENT(source)) src = source imype = indx (p_pe, gl_dc) 530: nlon = gl_dc(1)% nlon ! ! send if pe /= p_io ! IF (p_pe /= p_io) THEN 535: nglon = gl_dc(imype)% nglon CALL p_send (lc(:nglon,:,:), p_io, tag_gather_gp) ELSE DO i = 1, p_nprocs pe = gl_dc(i)% pe 540: nglon = gl_dc(i)% nglon ALLOCATE (buf (nglon, SIZE(gl,2), gl_dc(i)% nglat)) ! ! receive ! 545: IF (pe /= p_pe) THEN CALL p_recv( buf, pe, tag_gather_gp) ELSE buf = lc(:nglon,:,:) ENDIF 550: IF(src==-1 .OR. (src==0 .AND. p_io==pe) & .OR. (src==1 .AND. p_io/=pe)) THEN ! ! unpack first segment ! 555: gl (gl_dc(i)% glons(1) : gl_dc(i)% glone(1),:, & gl_dc(i)% glats(1) : gl_dc(i)% glate(1)) = & buf(:,:,:gl_dc(i)% nglat/2) ! ! unpack second segment 560: ! IF (gl_dc(i)% glone(2)>gl_dc(i)% glons(2)) THEN gl (gl_dc(i)% glons(2) : gl_dc(i)% glone(2),:, & gl_dc(i)% glats(2) : gl_dc(i)% glate(2)) = & buf(:,:,gl_dc(i)% nglat/2+1:) 565: ELSE ! ! unpack second segment, split in longitudes ! gl (gl_dc(i)% glons(2) : nlon, : , & 570: gl_dc(i)% glats(2) : gl_dc(i)% glate(2)) = & buf(:nlon-gl_dc(i)% glons(2)+1,:,gl_dc(i)% nglat/2+1:) gl (1: gl_dc(i)% glone(2), : , & gl_dc(i)% glats(2) : gl_dc(i)% glate(2)) = & buf(gl_dc(i)% nglon-gl_dc(i)% glone(2)+1:,:, & 575: gl_dc(i)% nglat/2+1:) ENDIF ENDIF DEALLOCATE (buf) END DO 580: ! ! set elements with i>nlon to zero ! gl (nlon+1:,:,:) = 0. ENDIF 585: END SUBROUTINE gather_gp3 !============================================================================== SUBROUTINE scatter_ls3 (gl, lc, gl_dc) ! ! send global spectral field to Legendre space (nlev, 2, nspec) 590: ! REAL ,POINTER :: gl (:,:,:) ! global field REAL ,INTENT(out) :: lc (:,:,:) ! local field TYPE (pe_decomposed) ,INTENT(in) :: gl_dc(:) ! global decomposition ! 595: ! Data structures: ! ! global spectral field: GL (1:nlev [+1] ,2,1:nspec ) ! local Legendre space: LC (1:nllev | nllevp1,2,1:lnsp) ! 600: ! The levels 1:NLLEVP1 in LC correspond to LLEVS:LLEVE in GL ! ! GL covers the longitudinal wave numbers m=0..nm ! The coefficients corresponding to wave number (n,m) are stored in ! GL(:,:,nmp(m+1)+n+1) 605: ! ! LC covers NLM longitudinal wave numbers m=LM(i), i=1,NLM. ! The coefficients corresponding to wave number (n,m) are stored in ! LC(:,:,nlmp(i)+n+1) ! 610: ! local variables ! REAL ,ALLOCATABLE :: buf (:,:,:) ! buffer (lev,2,nsp) INTEGER :: i, im ! loop indices INTEGER :: pe ! processor to communicate with 615: INTEGER :: mp1 ! wavenumber m+1 INTEGER :: llevs, lleve, nllevp1, lnsp, nlm INTEGER :: ke, nk INTEGER :: mp, np ! displacement, no n waves per m INTEGER :: mpgl ! displacement on global processor 620: ! ! send if pe = p_io ! IF (p_pe == p_io) THEN DO i = 1, p_nprocs 625: pe = gl_dc(i)% pe ! ! short names ! nllevp1 = gl_dc(i)% nllevp1 ! number of levels handled by pe 630: llevs = gl_dc(i)% llevs ! start level lleve = gl_dc(i)% lleve ! end level nlm = gl_dc(i)% nlm ! number of m vavenumbers handled by pe lnsp = gl_dc(i)% lnsp ! number of spectral (complex) coeff. ke = MIN (lleve,SIZE(gl,1)) 635: nk = ke - llevs + 1 ! number of levels to send IF (nk * lnsp < 1) CYCLE ! ! ALLOCATE (buf (nk, 2, lnsp)) 640: ! ! pack ! mp=0 DO im=1,nlm 645: mp1 = gl_dc(i)% lm(im)+1 np = gl_dc(i)% nnp (mp1) mpgl = gl_dc(i)% nmp (mp1) buf ( : ,:,mp +1:mp +np ) = & gl(llevs:ke,:,mpgl+1:mpgl+np) 650: mp = mp + np END DO IF (mp /= lnsp) THEN WRITE(nerr,*) 'scatter_ls: PE',pe,',mp/=lnsp:',mp,lnsp CALL finish('scatter_ls','mp/=lnsp') 655: ENDIF ! ! send ! IF (pe /= p_pe) THEN 660: CALL p_send( buf, pe, tag_scatter_ls) ELSE lc(:,:,:) = buf ENDIF DEALLOCATE (buf) 665: END DO ELSE ! ! receive if (p_io /= p_pe) ! 670: IF(SIZE(lc)>0) CALL p_recv (lc(:,:,:), p_io, tag_scatter_ls) END IF END SUBROUTINE scatter_ls3 !------------------------------------------------------------------------------ SUBROUTINE scatter_ls0 (gl, lc, gl_dc) 675: ! ! send global spectral field to Legendre space (m=0 only) (nlev,nnp1) ! REAL ,POINTER :: gl (:,:) ! global field REAL ,INTENT(out) :: lc (:,:) ! local field 680: TYPE (pe_decomposed) ,INTENT(in) :: gl_dc(:) ! global decomposition ! ! Data structures: ! 685: ! global spectral field: GL (1:nlev [+1] ,1:nnp1 ) ! local Legendre space: LC (1:nllev | nllevp1,1:nlnm0) ! ! The levels 1:NLLEVP1 in LC correspond to LLEVS:LLEVE in GL ! 690: ! GL covers the longitudinal wave numbers m=0..nm ! The coefficients corresponding to wave number (n,m) are stored in ! GL(:,nmp(m+1)+n+1,:) ! ! LC covers NLM longitudinal wave numbers m=LM(i), i=1,NLM. 695: ! The coefficients corresponding to wave number (n,m) are stored in ! LC(:,nlmp(i)+n+1,:) ! ! local variables ! 700: REAL ,ALLOCATABLE :: buf (:,:) ! buffer (lev,nnp1) INTEGER :: i ! loop indices INTEGER :: pe ! processor to communicate with INTEGER :: llevs, lleve, nllevp1 INTEGER :: ke, nk ! actuall last level, number of levs 705: INTEGER :: nlnm0 ! number of coefficiens for m=0 ! ! send if pe = p_io ! IF (p_pe == p_io) THEN 710: DO i = 1, p_nprocs pe = gl_dc(i)% pe ! ! short names ! 715: nllevp1 = gl_dc(i)% nllevp1 ! number of levels handled by pe llevs = gl_dc(i)% llevs ! start level lleve = gl_dc(i)% lleve ! end level nlnm0 = gl_dc(i)% nlnm0 ! number of coefficiens for m=0 IF (nlnm0 == 0) CYCLE 720: ke = MIN (lleve,SIZE(gl,1)) nk = ke - llevs + 1 ! number of levels to send IF (nk < 1) CYCLE ! ALLOCATE (buf (nk, nlnm0)) 725: ! ! pack ! buf (:,:) = gl(llevs:ke,:) ! 730: ! send ! IF (pe /= p_pe) THEN CALL p_send( buf, pe, tag_scatter_ls) ELSE 735: lc(:,:) = buf ENDIF DEALLOCATE (buf) END DO ELSE 740: ! ! receive if (p_io /= p_pe) ! IF(SIZE(lc)>0) CALL p_recv (lc(:,:), p_io, tag_scatter_ls) END IF 745: END SUBROUTINE scatter_ls0 !------------------------------------------------------------------------------ SUBROUTINE gather_ls3 (gl, lc, gl_dc, source) ! ! receive global spectral field from Legendre space (nlev, 2, nsp) 750: ! REAL ,POINTER :: gl (:,:,:) ! global field REAL ,INTENT(in) :: lc (:,:,:) ! local field TYPE (pe_decomposed) ,INTENT(in) :: gl_dc (:) ! global decomposition INTEGER, OPTIONAL ,INTENT(in) :: source ! -1=all;0=p_io;1=not p_io 755: ! ! Data structures: as described in scatter_ls ! REAL ,ALLOCATABLE :: buf (:,:,:) ! buffer (lev,2,nsp) INTEGER :: i, im ! loop indices 760: INTEGER :: pe ! processor to communicate with INTEGER :: mp1 ! wavenumber m+1 INTEGER :: llevs, lleve, nllevp1, lnsp, nlm INTEGER :: ke, nk INTEGER :: mp, np ! displacement, no n waves per m 765: INTEGER :: mpgl ! displacement on global processor INTEGER :: src ! local copy of 'source' src = debug_parallel IF (debug_parallel >= 0 .AND. PRESENT(source)) src = source ! 770: ! Data structures: as defined in scatter_ls ! ! receive if pe = p_io ! IF (p_pe == p_io) THEN 775: DO i = 1, p_nprocs pe = gl_dc(i)% pe ! ! short names ! 780: nllevp1 = gl_dc(i)% nllevp1 ! number of levels handled by pe llevs = gl_dc(i)% llevs ! start level lleve = gl_dc(i)% lleve ! end level nlm = gl_dc(i)% nlm ! number of m vavenumbers handled by pe lnsp = gl_dc(i)% lnsp ! number of spectral (complex) coeff. 785: ke = MIN (lleve,SIZE(gl,1)) nk = ke - llevs + 1 ! number of levels to send IF (nk < 1) CYCLE ! ! 790: ALLOCATE (buf (nk, 2, lnsp)) ! ! receive ! IF (pe /= p_pe) THEN 795: CALL p_recv( buf, pe, tag_gather_ls) ELSE buf = lc(:,:,:) ENDIF ! 800: ! unpack ! IF(src==-1 .OR. (src==0 .AND. p_io==pe) & .OR. (src==1 .AND. p_io/=pe)) THEN mp=0 805: DO im=1,nlm mp1 = gl_dc(i)% lm(im)+1 np = gl_dc(i)% nnp (mp1) mpgl = gl_dc(i)% nmp (mp1) gl (llevs:ke,:,mpgl+1:mpgl+np) = & 810: buf ( : ,:,mp +1:mp +np ) mp = mp + np END DO IF (mp /= lnsp) THEN WRITE(nerr,*) 'gather_ls: PE',pe,',mp/=lnsp:',mp,lnsp 815: CALL finish('gather_ls','mp/=lnsp') ENDIF ENDIF DEALLOCATE (buf) END DO 820: ELSE ! ! send if (p_io /= p_pe) ! IF(SIZE(lc,1)>0) THEN 825: CALL p_send (lc(:,:,:), p_io, tag_gather_ls) END IF END IF END SUBROUTINE gather_ls3 !------------------------------------------------------------------------------ 830: SUBROUTINE gather_ls0 (gl, lc, gl_dc, source) ! ! receive global spectral field from Legendre space (m=0 only) (nlev,nnp1) ! REAL ,POINTER :: gl (:,:) ! global field 835: REAL ,INTENT(in) :: lc (:,:) ! local field TYPE (pe_decomposed) ,INTENT(in) :: gl_dc (:) ! global decomposition INTEGER, OPTIONAL ,INTENT(in) :: source ! -1=all;0=p_io;1=not p_io ! ! Data structures: as described in scatter_ls 840: ! REAL ,ALLOCATABLE :: buf (:,:) ! buffer (lev,2,nsp) INTEGER :: i ! loop indices INTEGER :: pe ! processor to communicate with INTEGER :: llevs, lleve, nllevp1, nlnm0 845: INTEGER :: ke, nk INTEGER :: src src = debug_parallel IF (debug_parallel >= 0 .AND. PRESENT(source)) src = source ! 850: ! Data structures: as defined in scatter_ls ! ! receive if pe = p_io ! IF (p_pe == p_io) THEN 855: DO i = 1, p_nprocs pe = gl_dc(i)% pe ! ! short names ! 860: nllevp1 = gl_dc(i)% nllevp1 ! number of levels handled by pe llevs = gl_dc(i)% llevs ! start level lleve = gl_dc(i)% lleve ! end level nlnm0 = gl_dc(i)% nlnm0 ! number of coefficients for m=0 ke = MIN (lleve,SIZE(gl,1)) 865: nk = ke - llevs + 1 ! number of levels to send IF (nk*nlnm0 < 1) CYCLE ! ! ALLOCATE (buf (nk, nlnm0)) 870: ! ! receive ! IF (pe /= p_pe) THEN CALL p_recv( buf, pe, tag_gather_ls) 875: ELSE buf = lc(:,:) ENDIF ! ! unpack 880: ! IF(src==-1 .OR. (src==0 .AND. p_io==pe) & .OR. (src==1 .AND. p_io/=pe)) THEN gl (llevs:ke,:) = buf (:,:) ENDIF 885: DEALLOCATE (buf) END DO ELSE ! ! send if (p_io /= p_pe) 890: ! IF(SIZE(lc)>0) THEN CALL p_send (lc(:,:), p_io, tag_gather_ls) END IF END IF 895: END SUBROUTINE gather_ls0 !------------------------------------------------------------------------------ SUBROUTINE gather_ls4 (gl, lc, gl_dc, source) ! ! receive global spectral field from Legendre space (nmp1*2,nlev,nlat,nvar) 900: ! REAL ,POINTER :: gl (:,:,:,:) ! global field REAL ,INTENT(in) :: lc (:,:,:,:) ! local field TYPE (pe_decomposed) ,INTENT(in) :: gl_dc (:) ! global decomposition INTEGER, OPTIONAL ,INTENT(in) :: source ! 905: ! ! local variables ! REAL ,ALLOCATABLE :: buf (:,:,:,:) ! buffer (nlmp1*2,nlev,nlat,nvar) INTEGER :: i, im ! loop indices 910: INTEGER :: pe ! processor to communicate with INTEGER :: mp1 ! wavenumber m+1 INTEGER :: llevs, lleve, nllevp1, nlm, nlat, nvar INTEGER :: ke, nk INTEGER :: src ! local copy of 'source' 915: src = debug_parallel IF (debug_parallel >= 0 .AND. PRESENT(source)) src = source ! ! Data structures: as defined in scatter_ls ! 920: ! receive if pe = p_io ! IF (p_pe == p_io) THEN DO i = 1, p_nprocs pe = gl_dc(i)% pe 925: ! ! short names ! nllevp1 = gl_dc(i)% nllevp1 ! number of levels handled by pe llevs = gl_dc(i)% llevs ! start level 930: lleve = gl_dc(i)% lleve ! end level nlm = gl_dc(i)% nlm ! number of m vavenumbers handled by pe nlat = gl_dc(i)% nlat nvar = SIZE(gl,4) ke = MIN (lleve,SIZE(gl,2)) 935: nk = ke - llevs + 1 ! number of levels to send IF (nk < 1) CYCLE ! ! ALLOCATE (buf (nlm*2,nk,nlat,nvar)) 940: ! ! receive ! IF (pe /= p_pe) THEN CALL p_recv( buf, pe, tag_gather_ls) 945: ELSE buf = lc(:,:,:,:) ENDIF ! ! unpack 950: ! IF(src==-1 .OR. (src==0 .AND. p_io==pe) & .OR. (src==1 .AND. p_io/=pe)) THEN DO im=1,nlm mp1 = gl_dc(i)% lm(im)+1 955: gl (mp1*2-1,llevs:ke,:,:) = buf (im*2-1,:,:,:) gl (mp1*2 ,llevs:ke,:,:) = buf (im*2 ,:,:,:) END DO ENDIF DEALLOCATE (buf) 960: END DO ELSE ! ! send if (p_io /= p_pe) ! 965: IF(SIZE(lc,1)>0) THEN CALL p_send (lc(:,:,:,:), p_io, tag_gather_ls) END IF END IF END SUBROUTINE gather_ls4 970: !============================================================================== SUBROUTINE scatter_sp4 (gl, lc, gl_dc) ! ! scatter global grid point field from pe's (nlon,nlev,ntrac,nlat) ! or (nlon,nlev,nlat ,1 ) 975: ! or (nlon,nlat,1 ,1 ) ! REAL ,POINTER :: gl (:,:,:,:) ! global field REAL ,INTENT(out) :: lc (:,:,:,:) ! local field TYPE (pe_decomposed) ,INTENT(in) :: gl_dc(0:) ! global decomposition 980: INTEGER :: size4 ! size of 4rd dimension INTEGER :: size3 ! size of 3rd dimension REAL ,POINTER :: gl3(:,:,:) REAL ,POINTER :: gl2(:,:) ! 985: ! call 3D scatter routine if 4th dimension size is 1 ! else loop over 3rd index, call 2D scatter routine if 3rd dim size is 1 ! else call 3D scatter routine ! IF (p_pe == p_io) THEN 990: size4 = (SIZE(gl,4)) IF (size4/=1) CALL finish('scatter_sp4','size4/=1') size3 = (SIZE(gl,3)) ENDIF CALL p_bcast (size3, p_io) 995: IF (size3==1) THEN gl2 => gl(:,:,1,1) CALL scatter_sp0 (gl2, lc(:,:,1,1), gl_dc) ELSE gl3 => gl(:,:,:,1) 1000: CALL scatter_sp3 (gl3, lc(:,:,:,1), gl_dc) ENDIF END SUBROUTINE scatter_sp4 !------------------------------------------------------------------------------ SUBROUTINE scatter_sp3 (gl, lc, gl_dc) 1005: ! ! send global spectral field to spectral space (nlev,2,nsp) ! REAL ,POINTER :: gl (:,:,:) ! global field REAL ,INTENT(out) :: lc (:,:,:) ! local field 1010: TYPE (pe_decomposed) ,INTENT(in) :: gl_dc(:) ! global decomposition ! ! Data structures: ! ! global spectral field: GL (1:nlev ,2,1:nspec ) 1015: ! local Legendre space: LC (1:nlev ,2,1:snsp) ! ! GL covers the longitudinal wave numbers m=0..nm ! The coefficients corresponding to wave number (n,m) are stored in ! GL(:,:,nmp(m+1)+n+1) 1020: ! ! LS covers NSM longitudinal wave numbers m=SM(i), i=1,NSM. ! The coefficients corresponding to wave number (n,m) are stored in ! LS(:,:,nsmp(i)+n+1-snn0(i)) ! 1025: ! local variables ! REAL ,ALLOCATABLE :: buf (:,:,:) ! buffer (lev,2,nsp) INTEGER :: i, im ! loop indices INTEGER :: pe ! processor to communicate with 1030: INTEGER :: nlev ! number of levels INTEGER :: mp1 ! wavenumber m+1 INTEGER :: snsp ! number of spectral coefficients on PE INTEGER :: nsm ! number of m wavenumbers on pe INTEGER :: mp, np ! displacement, no n waves per m 1035: INTEGER :: mpgl ! displacement on global processor ! ! send if pe = p_io ! IF (p_pe == p_io) THEN 1040: DO i = 1, p_nprocs pe = gl_dc(i)% pe ! ! short names ! 1045: nsm = gl_dc(i)% nsm ! number of m vavenumbers handled by pe snsp = gl_dc(i)% snsp ! number of spectral (complex) coeff. nlev = SIZE(gl,1) IF (snsp < 1) CYCLE ! 1050: ! ALLOCATE (buf (nlev, 2, snsp)) ! ! pack ! 1055: mp=0 DO im=1,nsm mp1 = gl_dc(i)% sm(im)+1 np = gl_dc(i)% snnp(im) mpgl = gl_dc(i)% nmp (mp1) + gl_dc(i)% snn0(im) 1060: buf (:,:,mp +1:mp +np ) = & gl(:,:,mpgl+1:mpgl+np) mp = mp + np END DO IF (mp /= snsp) THEN 1065: WRITE(nerr,*) 'scatter_sp: PE',pe,',mp/=snsp:',mp,snsp CALL finish('scatter_sp','mp/=snsp') ENDIF ! ! send 1070: ! IF (pe /= p_pe) THEN CALL p_send( buf, pe, tag_scatter_sp) ELSE lc(:,:,:) = buf 1075: ENDIF DEALLOCATE (buf) END DO ELSE ! 1080: ! receive if (p_io /= p_pe) ! IF(SIZE(lc)>0) CALL p_recv (lc(:,:,:), p_io, tag_scatter_sp) END IF END SUBROUTINE scatter_sp3 1085: !------------------------------------------------------------------------------ SUBROUTINE scatter_sp0 (gl, lc, gl_dc) ! ! send global spectral field to spectral space (m=0 only) (nlev, nnp1) ! 1090: REAL ,POINTER :: gl (:,:) ! global field REAL ,INTENT(out) :: lc (:,:) ! local field TYPE (pe_decomposed) ,INTENT(in) :: gl_dc(:) ! global decomposition ! ! Data structures: 1095: ! ! local variables ! REAL ,ALLOCATABLE :: buf (:,:) ! buffer (lev,nnp1) INTEGER :: i ! loop indices 1100: INTEGER :: pe ! processor to communicate with INTEGER :: nsnm0 ! number of coefficiens for m=0 on PE INTEGER :: snn0 ! number of first n coefficient on PE INTEGER :: nlev ! number of levels ! 1105: ! send if pe = p_io ! IF (p_pe == p_io) THEN DO i = 1, p_nprocs pe = gl_dc(i)% pe 1110: ! ! short names ! nlev = SIZE(gl,1) ! number of levels nsnm0 = gl_dc(i)% nsnm0 ! number of coefficiens for m=0 on PE 1115: IF (nsnm0 == 0) CYCLE snn0 = gl_dc(i)% snn0(1) ! number of first n coefficient on PE ! ALLOCATE (buf (nlev, nsnm0)) ! 1120: ! pack ! buf (:,:) = gl(:,1+snn0:nsnm0+snn0) ! ! send 1125: ! IF (pe /= p_pe) THEN CALL p_send( buf, pe, tag_scatter_sp) ELSE lc(:,:) = buf 1130: ENDIF DEALLOCATE (buf) END DO ELSE ! 1135: ! receive if (p_io /= p_pe) ! IF(SIZE(lc)>0) CALL p_recv (lc(:,:), p_io, tag_scatter_sp) END IF END SUBROUTINE scatter_sp0 1140: !------------------------------------------------------------------------------ SUBROUTINE gather_sp4 (gl, lc, gl_dc, source) ! ! gather global grid point field from pe's (nlon,nlev,nlat ,1 ) ! or (nlon,nlat,1 ,1 ) 1145: ! REAL ,POINTER :: gl (:,:,:,:) ! global field REAL ,INTENT(in) :: lc (:,:,:,:) ! local field TYPE (pe_decomposed) ,INTENT(in) :: gl_dc(0:) ! global decomposition INTEGER, OPTIONAL ,INTENT(in) :: source ! source to gather from 1150: ! ! -1=all;0=p_io;1=not p_io INTEGER :: size4 ! size of 4rd dimension INTEGER :: size3 ! size of 3rd dimension REAL ,POINTER :: gl3(:,:,:) REAL ,POINTER :: gl2(:,:) 1155: ! ! abort if 4th dimension size is 1 ! call 2D gather routine if 3rd dimension size is 1 ! else call 3D gather routine ! 1160: IF (p_pe == p_io) THEN size4 = (SIZE(gl,4)) IF (size4/=1) CALL finish('gather_sp4','size4/=1') size3 = (SIZE(gl,3)) ENDIF 1165: CALL p_bcast (size3, p_io) IF (size3==1) THEN gl2 => gl(:,:,1,1) CALL gather_sp0 (gl2, lc(:,:,1,1), gl_dc, source) ELSE 1170: gl3 => gl(:,:,:,1) CALL gather_sp3 (gl3, lc(:,:,:,1), gl_dc, source) ENDIF END SUBROUTINE gather_sp4 !------------------------------------------------------------------------------ 1175: SUBROUTINE gather_sp3 (gl, lc, gl_dc, source) ! ! gather global spectral field from spectral space (nlev,2,nsp) ! REAL ,POINTER :: gl (:,:,:) ! global field 1180: REAL ,INTENT(in) :: lc (:,:,:) ! local field TYPE (pe_decomposed) ,INTENT(in) :: gl_dc(:) ! global decomposition INTEGER, OPTIONAL ,INTENT(in) :: source ! source to gather from ! ! -1=all;0=p_io;1=not p_io ! 1185: ! Data structures: ! ! global spectral field: GL (1:nlev ,2,1:nspec ) ! local Legendre space: LC (1:nlev ,2,1:snsp) ! 1190: ! GL covers the longitudinal wave numbers m=0..nm ! The coefficients corresponding to wave number (n,m) are stored in ! GL(:,:,nmp(m+1)+n+1) ! ! LC covers NSM longitudinal wave numbers m=SM(i), i=1,NSM. 1195: ! The coefficients corresponding to wave number (n,m) are stored in ! LC(:,:,nsmp(i)+n+1-snn0(i)) ! ! local variables ! 1200: REAL ,ALLOCATABLE :: buf (:,:,:) ! buffer (lev,2,nsp) INTEGER :: i, im ! loop indices INTEGER :: pe ! processor to communicate with INTEGER :: nlev ! number of levels INTEGER :: mp1 ! wavenumber m+1 1205: INTEGER :: snsp ! number of spectral coefficients on PE INTEGER :: nsm ! number of m wavenumbers on pe INTEGER :: mp, np ! displacement, no n waves per m INTEGER :: mpgl ! displacement on global processor INTEGER :: src ! source 1210: src = debug_parallel IF (debug_parallel >= 0 .AND. PRESENT(source)) src = source ! ! send if pe = p_io ! 1215: IF (p_pe == p_io) THEN DO i = 1, p_nprocs pe = gl_dc(i)% pe ! ! short names 1220: ! nsm = gl_dc(i)% nsm ! number of m vavenumbers handled by pe snsp = gl_dc(i)% snsp ! number of spectral (complex) coeff. nlev = SIZE(gl,1) IF (snsp < 1) CYCLE 1225: ! ! ALLOCATE (buf (nlev, 2, snsp)) ! ! receive 1230: ! IF (pe /= p_pe) THEN CALL p_recv( buf, pe, tag_gather_sp) ELSE buf = lc(:,:,:) 1235: ENDIF IF(src==-1 .OR. (src==0 .AND. p_io==pe) & .OR. (src==1 .AND. p_io/=pe)) THEN ! ! unpack 1240: ! mp=0 DO im=1,nsm mp1 = gl_dc(i)% sm(im)+1 np = gl_dc(i)% snnp(im) 1245: mpgl = gl_dc(i)% nmp (mp1) + gl_dc(i)% snn0(im) gl (:,:,mpgl+1:mpgl+np) = & buf (:,:,mp +1:mp +np ) mp = mp + np END DO 1250: IF (mp /= snsp) THEN WRITE(nerr,*) 'gather_sp: PE',pe,',mp/=snsp:',mp,snsp CALL finish('gather_sp','mp/=snsp') ENDIF ENDIF 1255: DEALLOCATE (buf) END DO ELSE ! ! send if (p_io /= p_pe) 1260: ! IF(SIZE(lc)>0) THEN CALL p_send (lc(:,:,:), p_io, tag_gather_sp) ENDIF END IF 1265: END SUBROUTINE gather_sp3 !------------------------------------------------------------------------------ SUBROUTINE gather_sp0 (gl, lc, gl_dc, source) ! ! gather global spectral field from spectral space (m=0 only) (nlev,nnp1) 1270: ! REAL ,POINTER :: gl (:,:) ! global field REAL ,INTENT(in) :: lc (:,:) ! local field TYPE (pe_decomposed) ,INTENT(in) :: gl_dc(:) ! global decomposition INTEGER, OPTIONAL ,INTENT(in) :: source ! source to gather from 1275: ! ! -1=all;0=p_io;1=not p_io ! ! Data structures: ! ! local variables 1280: ! REAL ,ALLOCATABLE :: buf (:,:) ! buffer (lev,nnp1) INTEGER :: i ! loop indices INTEGER :: pe ! processor to communicate with INTEGER :: nsnm0 ! number of coefficiens for m=0 on PE 1285: INTEGER :: snn0 ! number of first n coefficient on PE INTEGER :: nlev ! number of levels INTEGER :: src ! source src = debug_parallel IF (debug_parallel >= 0 .AND. PRESENT(source)) src = source 1290: ! ! send if pe = p_io ! IF (p_pe == p_io) THEN DO i = 1, p_nprocs 1295: pe = gl_dc(i)% pe ! ! short names ! nlev = SIZE(gl,1) ! number of levels 1300: nsnm0 = gl_dc(i)% nsnm0 ! number of coefficiens for m=0 on PE IF (nsnm0 == 0) CYCLE snn0 = gl_dc(i)% snn0(1) ! number of first n coefficient on PE ! ALLOCATE (buf (nlev, nsnm0)) 1305: ! ! receive ! IF (pe /= p_pe) THEN CALL p_recv( buf, pe, tag_gather_sp) 1310: ELSE buf = lc(:,:) ENDIF IF(src==-1 .OR. (src==0 .AND. p_io==pe) & .OR. (src==1 .AND. p_io/=pe)) THEN 1315: ! ! unpack ! gl(:,1+snn0:nsnm0+snn0) = buf (:,:) ENDIF 1320: DEALLOCATE (buf) END DO ELSE ! ! send if (p_io /= p_pe) 1325: ! IF(SIZE(lc)>0) CALL p_send (lc(:,:), p_io, tag_gather_sp) END IF END SUBROUTINE gather_sp0 !============================================================================== 1330: SUBROUTINE gather_sa42 (gl, lc, gl_dc, source) ! ! gather global grid point field from pe's (nlev,2,nmp1,nhgl) ! or (nlev,nhgl,1,1) ! 1335: REAL ,POINTER :: gl (:,:,:,:) ! global field REAL ,INTENT(in) :: lc (:,:,:,:) ! local field TYPE (pe_decomposed) ,INTENT(in) :: gl_dc(0:) ! global decomposition INTEGER, OPTIONAL ,INTENT(in) :: source ! source to gather from ! !-1=all;0=p_io;1=not p_io 1340: INTEGER :: size3 ! size of 3rd dimension * size of 4th dimension REAL ,POINTER :: gl2(:,:) ! ! call 2D gather routine if 3rd,4th index is 1 ! else call 3D gather routine 1345: ! IF (p_pe == p_io) size3 = SIZE(gl,3) * SIZE(gl,4) CALL p_bcast (size3, p_io) IF (size3 == 1) THEN IF (p_pe == p_io) gl2 => gl(:,:,1,1) 1350: CALL gather_sa2 (gl2, lc(:,:,1,1), gl_dc, source) ELSE CALL gather_sa4 (gl, lc, gl_dc, source) ENDIF END SUBROUTINE gather_sa42 1355: !------------------------------------------------------------------------------ SUBROUTINE gather_sa4 (gl, lc, gl_dc, source) ! ! receive global fourier (symetric/asymetric part) from Legendre space ! (nlev[+1],2,nmp1,nhgl), nlev,nmp1 split over PEs 1360: ! REAL ,POINTER :: gl (:,:,:,:) ! global field REAL ,INTENT(in) :: lc (:,:,:,:) ! local field TYPE (pe_decomposed) ,INTENT(in) :: gl_dc (:) ! global decomposition INTEGER, OPTIONAL ,INTENT(in) :: source ! 1365: ! ! local variables ! REAL ,ALLOCATABLE :: buf (:,:,:,:) ! buffer (nlev[+1],2,nmp1,nhgl) INTEGER :: i, im ! loop indices 1370: INTEGER :: pe ! processor to communicate with INTEGER :: mp1 ! wavenumber m+1 INTEGER :: llevs, lleve, nllevp1, nlm, nhgl INTEGER :: ke, nk INTEGER :: src 1375: src = debug_parallel IF (debug_parallel >= 0 .AND. PRESENT(source)) src = source ! ! Data structures: as defined in scatter_sa ! 1380: ! receive if pe = p_io ! IF (p_pe == p_io) THEN DO i = 1, p_nprocs pe = gl_dc(i)% pe 1385: ! ! short names ! nllevp1= gl_dc(i)% nllevp1 ! number of levels handled by pe llevs = gl_dc(i)% llevs ! start level 1390: lleve = gl_dc(i)% lleve ! end level nlm = gl_dc(i)% nlm ! number of m wavenumbers handled by pe nhgl = gl_dc(i)% nlat/2. ! half number of gaussian latitudes ke = MIN (lleve,SIZE(gl,1)) nk = ke - llevs + 1 ! number of levels to receive 1395: IF (nk < 1) CYCLE ! ! ALLOCATE (buf (nk,2,nlm,nhgl)) ! 1400: ! receive ! IF (pe /= p_pe) THEN CALL p_recv( buf, pe, tag_gather_sa) ELSE 1405: buf = lc(:,:,:,:) ENDIF ! ! unpack ! 1410: IF(src==-1 .OR. (src==0 .AND. p_io==pe) & .OR. (src==1 .AND. p_io/=pe)) THEN DO im=1,nlm mp1 = gl_dc(i)% lm(im)+1 gl (llevs:ke,:,mp1,:) = buf (:,:,im,:) 1415: END DO ENDIF DEALLOCATE (buf) END DO ELSE 1420: ! ! send if (p_io /= p_pe) ! IF(SIZE(lc,1)>0) THEN CALL p_send (lc(:,:,:,:), p_io, tag_gather_sa) 1425: END IF END IF END SUBROUTINE gather_sa4 !------------------------------------------------------------------------------ SUBROUTINE gather_sa2 (gl, lc, gl_dc, source) 1430: ! ! receive global fourier (symetric/asymetric part) from Legendre space ! (nlev[+1],nhgl), nlev split over PEs, present only if nlnm0>0 ! REAL ,POINTER :: gl (:,:) ! global field 1435: REAL ,INTENT(in) :: lc (:,:) ! local field TYPE (pe_decomposed) ,INTENT(in) :: gl_dc (:) ! global decomposition INTEGER, OPTIONAL ,INTENT(in) :: source ! ! ! local variables 1440: ! REAL ,ALLOCATABLE :: buf (:,:) ! buffer (nlev[+1],nhgl) INTEGER :: i ! loop indices INTEGER :: pe ! processor to communicate with INTEGER :: llevs, lleve, nllevp1, nlnm0, nhgl 1445: INTEGER :: ke, nk INTEGER :: src src = debug_parallel IF (debug_parallel >= 0 .AND. PRESENT(source)) src = source ! 1450: ! Data structures: as defined in scatter_sa ! ! receive if pe = p_io ! IF (p_pe == p_io) THEN 1455: DO i = 1, p_nprocs pe = gl_dc(i)% pe ! ! short names ! 1460: nllevp1= gl_dc(i)% nllevp1 ! number of levels handled by pe llevs = gl_dc(i)% llevs ! start level lleve = gl_dc(i)% lleve ! end level nlnm0 = gl_dc(i)% nlnm0 ! number of n wavenumbers for m=0 nhgl = gl_dc(i)% nlat/2. ! half number of gaussian latitudes 1465: ke = MIN (lleve,SIZE(gl,1)) nk = ke - llevs + 1 ! number of levels to receive IF (nk < 1 .OR. nlnm0 < 1) CYCLE ! ! 1470: ALLOCATE (buf (nk,nhgl)) ! ! receive ! IF (pe /= p_pe) THEN 1475: CALL p_recv( buf, pe, tag_gather_sa) ELSE buf = lc(:,:) ENDIF ! 1480: ! unpack ! IF(src==-1 .OR. (src==0 .AND. p_io==pe) & .OR. (src==1 .AND. p_io/=pe)) THEN gl (llevs:ke,:) = buf (:,:) 1485: ENDIF DEALLOCATE (buf) END DO ELSE ! 1490: ! send if (p_io /= p_pe) ! i = indx (p_pe, gl_dc) nlnm0 = gl_dc(i)% nlnm0 ! number of n wavenumbers for m=0 IF(SIZE(lc,1)>0 .AND. nlnm0>0 ) THEN 1495: CALL p_send (lc(:,:), p_io, tag_gather_sa) END IF END IF END SUBROUTINE gather_sa2 !============================================================================== 1500: SUBROUTINE scatter_sa42 (gl, lc, gl_dc) ! ! scatter global grid point field from pe's (nlev,2,nmp1,nhgl) ! or (nlev,nhgl,1,1) ! 1505: REAL ,POINTER :: gl (:,:,:,:) ! global field REAL ,INTENT(out) :: lc (:,:,:,:) ! local field TYPE (pe_decomposed) ,INTENT(in) :: gl_dc(0:) ! global decomposition INTEGER :: size3 ! size of 3rd dimension * size of 4th dimension REAL ,POINTER :: gl2(:,:) 1510: ! ! call 2D scatter routine if 3rd,4th index is 1 ! else call 3D scatter routine ! IF (p_pe == p_io) size3 = SIZE(gl,3) * SIZE(gl,4) 1515: CALL p_bcast (size3, p_io) IF (size3 == 1) THEN IF (p_pe == p_io) gl2 => gl(:,:,1,1) CALL scatter_sa2 (gl2, lc(:,:,1,1), gl_dc) ELSE 1520: CALL scatter_sa4 (gl, lc, gl_dc) ENDIF END SUBROUTINE scatter_sa42 !------------------------------------------------------------------------------ SUBROUTINE scatter_sa4 (gl, lc, gl_dc) 1525: ! ! receive global fourier (symetric/asymetric part) from Legendre space ! (nlev[+1],2,nmp1,nhgl), nlev,nmp1 split over PEs ! REAL ,POINTER :: gl (:,:,:,:) ! global field 1530: REAL ,INTENT(out) :: lc (:,:,:,:) ! local field TYPE (pe_decomposed) ,INTENT(in) :: gl_dc (:) ! global decomposition ! ! local variables ! 1535: REAL ,ALLOCATABLE :: buf (:,:,:,:) ! buffer (nlev[+1],2,nmp1,nhgl) INTEGER :: i, im ! loop indices INTEGER :: pe ! processor to communicate with INTEGER :: mp1 ! wavenumber m+1 INTEGER :: llevs, lleve, nllevp1, nlm, nhgl 1540: INTEGER :: ke, nk ! ! send if pe = p_io ! IF (p_pe == p_io) THEN 1545: DO i = 1, p_nprocs pe = gl_dc(i)% pe ! ! short names ! 1550: nllevp1= gl_dc(i)% nllevp1 ! number of levels handled by pe llevs = gl_dc(i)% llevs ! start level lleve = gl_dc(i)% lleve ! end level nlm = gl_dc(i)% nlm ! number of m wavenumbers handled by pe nhgl = gl_dc(i)% nlat/2. ! half number of gaussian latitudes 1555: ke = MIN (lleve,SIZE(gl,1)) nk = ke - llevs + 1 ! number of levels to receive IF (nk < 1) CYCLE ! ! 1560: ALLOCATE (buf (nk,2,nlm,nhgl)) ! ! pack ! DO im=1,nlm 1565: mp1 = gl_dc(i)% lm(im)+1 buf (:,:,im,:) = gl (llevs:ke,:,mp1,:) END DO ! ! send 1570: ! IF (pe /= p_pe) THEN CALL p_send( buf, pe, tag_scatter_sa) ELSE lc(:,:,:,:) = buf 1575: ENDIF DEALLOCATE (buf) END DO ELSE ! 1580: ! receive if (p_io /= p_pe) ! IF(SIZE(lc,1)>0) THEN CALL p_recv (lc(:,:,:,:), p_io, tag_scatter_sa) END IF 1585: END IF END SUBROUTINE scatter_sa4 !------------------------------------------------------------------------------ SUBROUTINE scatter_sa2 (gl, lc, gl_dc) ! 1590: ! receive global fourier (symetric/asymetric part) from Legendre space ! (nlev[+1],nhgl), nlev split over PEs, present only if nlnm0>0 ! REAL ,POINTER :: gl (:,:) ! global field REAL ,INTENT(out) :: lc (:,:) ! local field 1595: TYPE (pe_decomposed) ,INTENT(in) :: gl_dc (:) ! global decomposition ! ! local variables ! REAL ,ALLOCATABLE :: buf (:,:) ! buffer (nlev[+1],nhgl) 1600: INTEGER :: i ! loop indices INTEGER :: pe ! processor to communicate with INTEGER :: llevs, lleve, nllevp1, nlnm0, nhgl INTEGER :: ke, nk ! 1605: ! send if pe = p_io ! IF (p_pe == p_io) THEN DO i = 1, p_nprocs pe = gl_dc(i)% pe 1610: ! ! short names ! nllevp1= gl_dc(i)% nllevp1 ! number of levels handled by pe llevs = gl_dc(i)% llevs ! start level 1615: lleve = gl_dc(i)% lleve ! end level nlnm0 = gl_dc(i)% nlnm0 ! number of n wavenumbers for m=0 nhgl = gl_dc(i)% nlat/2. ! half number of gaussian latitudes ke = MIN (lleve,SIZE(gl,1)) nk = ke - llevs + 1 ! number of levels to receive 1620: IF (nk < 1 .OR. nlnm0 < 1) CYCLE ! ! ALLOCATE (buf (nk,nhgl)) ! 1625: ! pack ! buf (:,:) = gl (llevs:ke,:) ! ! send 1630: ! IF (pe /= p_pe) THEN CALL p_send( buf, pe, tag_scatter_sa) ELSE lc(:,:) = buf 1635: ENDIF DEALLOCATE (buf) END DO ELSE ! 1640: ! receive if (p_io /= p_pe) ! i = indx (p_pe, gl_dc) nlnm0 = gl_dc(i)% nlnm0 ! number of n wavenumbers for m=0 IF(SIZE(lc,1)>0 .AND. nlnm0>0 ) THEN 1645: CALL p_recv (lc(:,:), p_io, tag_scatter_sa) END IF END IF END SUBROUTINE scatter_sa2 !============================================================================== 1650: SUBROUTINE tr_gp_fs (gl_dc, sign, gp1, gp2, gp3, gp4, gp5, gp6, gp7,& sf1, sf2, sf3, zm1, zm2, zm3, fs, fs0) ! ! transpose ! sign= 1 : grid point space -> Fourier space 1655: ! sign=-1 : grid point space <- Fourier space ! ! TYPE (pe_decomposed) ,INTENT(in) :: gl_dc (:) ! decomposition INTEGER ,INTENT(in) :: sign ! 1:gp>fs; -1:gp<fs 1660: REAL ,INTENT(inout) :: gp1 (:,:,:) ! gridpoint space 3d REAL ,INTENT(inout) :: gp2 (:,:,:) ! REAL ,INTENT(inout) :: gp3 (:,:,:) ! REAL ,INTENT(inout) :: gp4 (:,:,:) ! REAL ,INTENT(inout) :: gp5 (:,:,:) ! 1665: REAL ,INTENT(inout) :: gp6 (:,:,:) ! REAL ,INTENT(inout) :: gp7 (:,:,:) ! REAL ,OPTIONAL ,INTENT(inout) :: sf1 (:,:) ! gridpoint space 2d REAL ,OPTIONAL ,INTENT(inout) :: sf2 (:,:) ! gridpoint space 2d REAL ,OPTIONAL ,INTENT(inout) :: sf3 (:,:) ! gridpoint space 2d 1670: REAL ,OPTIONAL ,INTENT(inout) :: zm1 (:,:) ! zonal mean REAL ,OPTIONAL ,INTENT(inout) :: zm2 (:,:) ! zonal mean REAL ,OPTIONAL ,INTENT(inout) :: zm3 (:,:) ! zonal mean REAL ,INTENT(inout) :: fs (:,:,:,:) ! Fourier space REAL ,OPTIONAL ,INTENT(inout) :: fs0 (:,:,:) ! zonal mean, Four. 1675: ! ! Data structures: ! ! local variables ! 1680: INTEGER :: i, j INTEGER :: pe ! pe to communicate with INTEGER :: imype ! index of this pe REAL ,ALLOCATABLE :: buf (:,:,:,:) ! buffer REAL ,ALLOCATABLE :: bu0 (:,:,:) ! buffer zonal means 1685: LOGICAL :: m0 ! transpose zonal means INTEGER :: seta ! my set A INTEGER :: ks, ke, nk ! vertical range of buffer INTEGER :: nk0 ! vertical range of buffer bu0 INTEGER :: nglat, nglon ! gridpoint space no. lons,lats 1690: INTEGER :: glons(2) ! first longitudes in gridspace INTEGER :: glone(2) ! last longitudes in gridspace INTEGER :: nlon ! global number of longitudes INTEGER :: nvar ! number of variables in fft buffer INTEGER :: nva0 ! number of variables (zonal mean) 1695: INTEGER :: nle0 ! number of levels (zonal mean) ! ! loop 1: send if (dest_pe > src_pe); else recv ! nvar = SIZE (fs,4) 1700: imype = indx (p_pe, gl_dc) seta = gl_dc(imype)% set_a nva0 = 0; IF (PRESENT(fs0)) nva0 = SIZE (fs0,3) nle0 = 0; IF (PRESENT(fs0)) nle0 = SIZE (fs0,1) SELECT CASE (sign) 1705: ! ! grid point -> Fourier ! CASE (1) ! DO i = 1, p_nprocs 1710: DO i = dc%spe, dc%epe IF (gl_dc(i)% set_a /= seta) CYCLE IF (i == imype) THEN ! DO j = 1, p_nprocs DO j = dc%spe, dc%epe 1715: pe = gl_dc(j)% pe IF (gl_dc(j)% set_a /= seta) CYCLE IF (.NOT. gl_dc(j)% lfused) CYCLE CALL alloc_gp_fs_buf (gp_i=imype, fs_i=j) CALL pack_gp_buf 1720: IF (pe /= p_pe) THEN CALL p_send ( buf, pe, tag_tr_gp_fs) IF (m0) CALL p_send ( bu0, pe, tag_tr_gp_fs) ELSE CALL unpack_buf_fs 1725: ENDIF DEALLOCATE (buf) IF (m0) DEALLOCATE (bu0) END DO ELSE 1730: pe = gl_dc(i)% pe IF (.NOT. gl_dc(imype)% lfused) CYCLE CALL alloc_gp_fs_buf (gp_i=i, fs_i=imype) CALL p_recv ( buf, pe, tag_tr_gp_fs) IF (m0) CALL p_recv ( bu0, pe, tag_tr_gp_fs) 1735: CALL unpack_buf_fs DEALLOCATE (buf) IF (m0) DEALLOCATE (bu0) ENDIF END DO 1740: ! ! zero latitudes > nlat ! fs (gl_dc(imype)% nlon+1:,:,:,:) = 0. CASE (-1) 1745: ! ! Fourier -> grid point ! ! DO i = 1, p_nprocs DO i = dc%spe, dc%epe 1750: IF (gl_dc(i)% set_a /= seta) CYCLE IF (i == imype) THEN ! DO j = 1, p_nprocs DO j = dc%spe, dc%epe pe = gl_dc(j)% pe 1755: IF (gl_dc(j)% set_a /= seta) CYCLE IF (gl_dc(j)% nglat==0) CYCLE CALL alloc_gp_fs_buf (gp_i=j, fs_i=imype) CALL pack_fs_buf IF (pe /= p_pe) THEN 1760: CALL p_send ( buf, pe, tag_tr_gp_fs) IF (m0) CALL p_send ( bu0, pe, tag_tr_gp_fs) ELSE CALL unpack_buf_gp ENDIF 1765: DEALLOCATE (buf) IF (m0) DEALLOCATE (bu0) END DO ELSE pe = gl_dc(i)% pe 1770: IF (gl_dc(imype)% nglat==0) CYCLE CALL alloc_gp_fs_buf (gp_i=imype, fs_i=i) CALL p_recv ( buf, pe, tag_tr_gp_fs) IF (m0) CALL p_recv ( bu0, pe, tag_tr_gp_fs) CALL unpack_buf_gp 1775: DEALLOCATE (buf) IF (m0) DEALLOCATE (bu0) ENDIF END DO CASE default 1780: CALL finish ('tr_fs_buf','invalid SIGN parameter (not 1,-1)') END SELECT CONTAINS !------------------------------------------------------------------------------ SUBROUTINE alloc_gp_fs_buf (gp_i, fs_i) 1785: INTEGER :: gp_i, fs_i ! indices of grid point and Fourier space pe ! ! derive bounds and allocate buffer ! ks = gl_dc(fs_i)% flevs 1790: ke = gl_dc(fs_i)% fleve nk = gl_dc(fs_i)% nflevp1 nk0 = gl_dc(fs_i)% nflev nglat = gl_dc(gp_i)% nglat nglon = gl_dc(gp_i)% nglon 1795: glons = gl_dc(gp_i)% glons glone = gl_dc(gp_i)% glone nlon = gl_dc(gp_i)% nlon m0 = PRESENT(fs0).AND.(fs_i==gp_i.OR.sign==-1) ALLOCATE (buf (nglon, nk, nglat, nvar) ) 1800: IF (m0) ALLOCATE (bu0 (nk0, nglat, nva0)) END SUBROUTINE alloc_gp_fs_buf !------------------------------------------------------------------------------ SUBROUTINE pack_gp_buf ! 1805: ! pack message to send/recv buffer buf ! ! ! pack 2d arrays ! 1810: IF (ke == gl_dc(imype)% nlev+1) THEN buf (:,nk,:,:) = 0. IF(PRESENT(sf1)) buf (:,nk,:,1) = sf1(:nglon,:) IF(PRESENT(sf2)) buf (:,nk,:,2) = sf2(:nglon,:) IF(PRESENT(sf3)) buf (:,nk,:,3) = sf3(:nglon,:) 1815: ke = ke - 1 nk = nk - 1 ENDIF ! ! pack 3d arrays 1820: ! IF(nk > 0) THEN buf (:,:nk,:,1) = gp1 (:nglon,ks:ke,:) buf (:,:nk,:,2) = gp2 (:nglon,ks:ke,:) buf (:,:nk,:,3) = gp3 (:nglon,ks:ke,:) 1825: buf (:,:nk,:,4) = gp4 (:nglon,ks:ke,:) buf (:,:nk,:,5) = gp5 (:nglon,ks:ke,:) buf (:,:nk,:,6) = gp6 (:nglon,ks:ke,:) buf (:,:nk,:,7) = gp7 (:nglon,ks:ke,:) ENDIF 1830: ! ! pack zonal mean ! IF (m0) THEN IF(PRESENT(zm1)) bu0 (:,:,1) = zm1 (ks:ke,:) 1835: IF(PRESENT(zm2)) bu0 (:,:,2) = zm2 (ks:ke,:) IF(PRESENT(zm3)) bu0 (:,:,3) = zm3 (ks:ke,:) ENDIF END SUBROUTINE pack_gp_buf !------------------------------------------------------------------------------ 1840: SUBROUTINE unpack_buf_gp ! ! unpack grid point space from send/recv buffer buf ! ! 1845: ! 2d arrays ! IF (ke == gl_dc(imype)% nlev+1) THEN IF(PRESENT(sf1)) THEN sf1(:nglon,:) = buf (:,nk,:,1); sf1(nglon+1:,:)=0. 1850: ENDIF IF(PRESENT(sf2)) THEN sf2(:nglon,:) = buf (:,nk,:,2); sf2(nglon+1:,:)=0. ENDIF IF(PRESENT(sf3)) THEN 1855: sf3(:nglon,:) = buf (:,nk,:,3); sf3(nglon+1:,:)=0. ENDIF ke = ke - 1 nk = nk - 1 ENDIF 1860: ! ! unpack 3d arrays ! IF(nk > 0) THEN gp1 (:nglon,ks:ke,:) = buf (:,:nk,:,1); gp1 (nglon+1:,ks:ke,:)=0. 1865: gp2 (:nglon,ks:ke,:) = buf (:,:nk,:,2); gp2 (nglon+1:,ks:ke,:)=0. gp3 (:nglon,ks:ke,:) = buf (:,:nk,:,3); gp3 (nglon+1:,ks:ke,:)=0. gp4 (:nglon,ks:ke,:) = buf (:,:nk,:,4); gp4 (nglon+1:,ks:ke,:)=0. gp5 (:nglon,ks:ke,:) = buf (:,:nk,:,5); gp5 (nglon+1:,ks:ke,:)=0. gp6 (:nglon,ks:ke,:) = buf (:,:nk,:,6); gp6 (nglon+1:,ks:ke,:)=0. 1870: gp7 (:nglon,ks:ke,:) = buf (:,:nk,:,7); gp7 (nglon+1:,ks:ke,:)=0. ENDIF ! ! unpack zonal mean ! 1875: IF (m0) THEN IF(PRESENT(zm1)) zm1 (ks:ke,:) = bu0 (:,:,1) IF(PRESENT(zm2)) zm2 (ks:ke,:) = bu0 (:,:,2) IF(PRESENT(zm3)) zm3 (ks:ke,:) = bu0 (:,:,3) ENDIF 1880: END SUBROUTINE unpack_buf_gp !------------------------------------------------------------------------------ SUBROUTINE unpack_buf_fs ! ! unpack message to fourier buffer fs 1885: ! ! ! unpack first segment ! fs(glons(1):glone(1),:,:nglat/2,:) = buf(:,:,:nglat/2,:) 1890: ! ! unpack second segment ! IF (glone(2)>glons(2)) THEN fs(glons(2):glone(2),:,nglat/2+1:,:) = buf(:,:,nglat/2+1:,:) 1895: ELSE ! ! unpack second segment, split into longitudes ! fs (glons(2) :nlon ,:,nglat/2+1:,:) = & 1900: buf( :nlon-glons(2)+1,:,nglat/2+1:,:) fs (1 :glone(2) ,:,nglat/2+1:,:) = & buf(nglon-glone(2)+1: ,:,nglat/2+1:,:) ENDIF 1905: ! ! unpack zonal mean ! IF (m0) fs0 (:,:,:) = bu0 (:,:,:) END SUBROUTINE unpack_buf_fs 1910: !------------------------------------------------------------------------------ SUBROUTINE pack_fs_buf ! ! pack fourier buffer fs to buffer ! 1915: ! ! pack first segment ! buf(:,:,:nglat/2,:) = fs(glons(1):glone(1),:,:nglat/2,:) ! 1920: ! pack second segment ! IF (glone(2)>glons(2)) THEN buf(:,:,nglat/2+1:,:) = fs(glons(2):glone(2),:,nglat/2+1:,:) ELSE 1925: ! ! pack second segment, split into longitudes ! buf ( :nlon-glons(2)+1,:,nglat/2+1:,:) = & fs (glons(2) :nlon ,:,nglat/2+1:,:) 1930: buf (nglon-glone(2)+1: ,:,nglat/2+1:,:) = & fs (1 :glone(2) ,:,nglat/2+1:,:) ENDIF ! ! pack zonal mean 1935: ! IF (m0) bu0 (:,:,:) = fs0 (:,:,:) END SUBROUTINE pack_fs_buf !------------------------------------------------------------------------------ END SUBROUTINE tr_gp_fs 1940: !============================================================================== SUBROUTINE tr_fs_ls (gl_dc, sign, fs, ls, fs0, ls0) ! ! transpose ! sign= 1 : Fourier space -> Legendre space 1945: ! sign=-1 : Fourier space <- Legendre space ! TYPE (pe_decomposed) ,INTENT(in) :: gl_dc (:) ! decomposition INTEGER ,INTENT(in) :: sign ! 1:fs>ls; -1:gs<ls ! 1950: ! Assumed shape array association: ! REAL ,INTENT(inout) :: fs (:,:,:,:) ! fs REAL ,INTENT(inout) :: ls (:,:,:,:) ! ls REAL ,OPTIONAL ,INTENT(inout) :: fs0 (:,:,:) ! fs, zonal means 1955: REAL ,OPTIONAL ,INTENT(inout) :: ls0 (:,:,:) ! ls, zonal means ! ! Array element sequence association to speed up indexing: ! ! 1960: ! local variables ! INTEGER :: i, j ! loop indices INTEGER :: imype ! index of this pe INTEGER :: nvar ! number of variables 1965: INTEGER :: nva0 ! number of variables (m=0 only) INTEGER :: nlev ! number of levels INTEGER :: nlev0 ! number of levels (m=0 only) INTEGER :: nflat ! number of latitudes (2*nhgl) INTEGER :: flats(2) ! first latitude in Fourier space 1970: INTEGER :: flate(2) ! last latitude in Fourier space INTEGER :: nlm ! number of m coeff. in Legendre space INTEGER ,POINTER :: intr(:) ! index array LOGICAL :: m0 ! coeff. m=0 present in Legendre space INTEGER :: setb ! set B 1975: INTEGER :: pe ! processor to communicate with INTEGER :: n2mp1 ! total number of coeff. from lgti REAL ,ALLOCATABLE :: buf(:,:,:,:) ! send/receive buffer REAL ,ALLOCATABLE :: bu0 (:,:,:) ! send/receive buffer (zonal mean) ! 1980: ! invariant local variables ! nvar = SIZE (fs,4) nva0 = 0; nlev0 = 0 IF (PRESENT(fs0).AND.PRESENT(ls0)) THEN 1985: nva0 = SIZE (fs0,3); nlev0 = SIZE (fs0,1) ENDIF nlev = SIZE (fs,2) imype = indx (p_pe, gl_dc) setb = gl_dc(imype)% set_b 1990: n2mp1 = (gl_dc(imype)% nm + 1) * 2 SELECT CASE (sign) ! ! Fourier -> Legendre space ! 1995: CASE (1) ! DO i = 1, p_nprocs DO i = dc%spe, dc%epe IF (gl_dc(i)% set_b /= setb) CYCLE IF (i == imype) THEN 2000: ! DO j = 1, p_nprocs DO j = dc%spe, dc%epe pe = gl_dc(j)% pe IF (gl_dc(j)% set_b /= setb) CYCLE IF (.NOT. gl_dc(imype)% lfused) CYCLE 2005: CALL alloc_fs_ls_buf (fs_i=imype, ls_i=j) CALL pack_fs_buf IF (pe /= p_pe) THEN CALL p_send ( buf, pe, tag_tr_fs_ls) IF (m0) CALL p_send ( bu0, pe, tag_tr_fs_ls) 2010: ELSE CALL unpack_buf_ls ENDIF DEALLOCATE (buf) IF (m0) DEALLOCATE (bu0) 2015: END DO ELSE pe = gl_dc(i)% pe IF (.NOT. gl_dc(i)% lfused) CYCLE CALL alloc_fs_ls_buf (fs_i=i, ls_i=imype) 2020: CALL p_recv ( buf, pe, tag_tr_fs_ls) IF (m0) CALL p_recv ( bu0, pe, tag_tr_fs_ls) CALL unpack_buf_ls DEALLOCATE (buf) IF (m0) DEALLOCATE (bu0) 2025: ENDIF END DO CASE (-1) ! ! Legendre -> Fourier 2030: ! ! DO i = 1, p_nprocs DO i = dc%spe, dc%epe IF (gl_dc(i)% set_b /= setb) CYCLE IF (i == imype) THEN 2035: ! DO j = 1, p_nprocs DO j = dc%spe, dc%epe pe = gl_dc(j)% pe IF (gl_dc(j)% set_b /= setb) CYCLE CALL alloc_fs_ls_buf (fs_i=j, ls_i=imype) 2040: CALL pack_ls_buf IF (pe /= p_pe) THEN CALL p_send ( buf, pe, tag_tr_fs_ls) IF (m0) CALL p_send ( bu0, pe, tag_tr_fs_ls) ELSE 2045: CALL unpack_buf_fs ENDIF DEALLOCATE (buf) IF (m0) DEALLOCATE (bu0) END DO 2050: ELSE pe = gl_dc(i)% pe CALL alloc_fs_ls_buf (fs_i=imype, ls_i=i) CALL p_recv ( buf, pe, tag_tr_fs_ls) IF (m0) CALL p_recv ( bu0, pe, tag_tr_fs_ls) 2055: CALL unpack_buf_fs DEALLOCATE (buf) IF (m0) DEALLOCATE (bu0) ENDIF END DO 2060: fs (n2mp1+1:,:,:,:) = 0. !zero coefficients not provided by inv. Legendre CASE default CALL finish ('tr_fs_ls','invalid SIGN parameter (not 1,-1)') END SELECT CONTAINS 2065: !------------------------------------------------------------------------------ SUBROUTINE alloc_fs_ls_buf (fs_i, ls_i) INTEGER :: fs_i, ls_i ! indices of Fourier and Lagendre space pe ! ! derive bounds and allocate buffer 2070: ! nflat = gl_dc(fs_i)% nflat flats = gl_dc(fs_i)% flats flate = gl_dc(fs_i)% flate nlm = gl_dc(ls_i)% nlm 2075: m0 = gl_dc(ls_i)% nlnm0 > 0 .AND. PRESENT(fs0) intr => gl_dc(ls_i)% intr ALLOCATE (buf(2*nlm,nlev,nflat,nvar)) IF(m0) ALLOCATE (bu0 (nlev0,nflat,nva0)) END SUBROUTINE alloc_fs_ls_buf 2080: !------------------------------------------------------------------------------ SUBROUTINE pack_fs_buf buf(:,:,:,:) = fs (intr,:,:,:) IF(m0) bu0 = fs0 END SUBROUTINE pack_fs_buf 2085: !------------------------------------------------------------------------------ SUBROUTINE unpack_buf_fs fs (intr,:,:,:) = buf(:,:,:,:) IF(m0) fs0 = bu0 END SUBROUTINE unpack_buf_fs 2090: !------------------------------------------------------------------------------ SUBROUTINE unpack_buf_ls ls(:,:,flats(1):flate(1),:) = buf (:,:, :nflat/2,:) ls(:,:,flats(2):flate(2),:) = buf (:,:,nflat/2+1: ,:) IF(m0) THEN 2095: ls0 (:,flats(1):flate(1),:) = bu0 (:, :nflat/2,:) ls0 (:,flats(2):flate(2),:) = bu0 (:,nflat/2+1: ,:) ENDIF END SUBROUTINE unpack_buf_ls !------------------------------------------------------------------------------ 2100: SUBROUTINE pack_ls_buf buf (:,:, :nflat/2,:) = ls(:,:,flats(1):flate(1),:) buf (:,:,nflat/2+1: ,:) = ls(:,:,flats(2):flate(2),:) IF(m0) THEN bu0 (:, :nflat/2,:) = ls0 (:,flats(1):flate(1),:) 2105: bu0 (:,nflat/2+1: ,:) = ls0 (:,flats(2):flate(2),:) ENDIF END SUBROUTINE pack_ls_buf !------------------------------------------------------------------------------ END SUBROUTINE tr_fs_ls 2110: !============================================================================== SUBROUTINE tr_ls_sp (gl_dc, sign, ls1, sp1, ls2, sp2, ls3, sp3, ls0, sp0) ! ! transpose ! sign= 1 : Legendre space -> spectral space 2115: ! sign=-1 : Legendre space <- spectral space ! TYPE (pe_decomposed) ,INTENT(in) :: gl_dc (:) ! decomposition INTEGER ,INTENT(in) :: sign ! 1:ls>sp; -1:ls<sp REAL ,INTENT(inout) :: ls1 (:,:,:) ! Legendre space 2120: REAL ,INTENT(inout) :: sp1 (:,:,:) ! spectral space REAL ,INTENT(inout) :: ls2 (:,:,:) ! Legendre space REAL ,INTENT(inout) :: sp2 (:,:,:) ! spectral space REAL ,INTENT(inout) :: ls3 (:,:,:) ! Legendre space REAL ,INTENT(inout) :: sp3 (:,:,:) ! spectral space 2125: REAL ,OPTIONAL ,INTENT(inout) :: ls0 (:,:) ! Legendre (m=0 only) REAL ,OPTIONAL ,INTENT(inout) :: sp0 (:,:) ! spectral (m=0 only) ! ! local variables ! 2130: REAL ,ALLOCATABLE :: buf (:,:,:,:) ! send/receive buffer REAL ,ALLOCATABLE :: bu0 (:,:) ! send/receive buffer (m=0 only) INTEGER :: seta ! this set A INTEGER :: i, j ! loop indices INTEGER :: imype ! decomposition table index of this pe 2135: INTEGER :: pe ! processor to communicate with INTEGER :: nllevp1 ! number of levels in Legendre space INTEGER :: llevs ! first level in Legendre space INTEGER :: lleve ! last level in Legendre space INTEGER :: nlnm0 ! number of coeff. with m=0 (Legendre) 2140: INTEGER :: snsp ! number of coefficients in sp. space INTEGER :: ssps ! first coefficients in spectral space INTEGER :: sspe ! last coefficients in spectral space INTEGER :: nsnm0 ! number of coeff. with m=0 (spectral) INTEGER :: snn0 ! first n for m=0 in spectral space 2145: LOGICAL :: m0 ! transpose array with m=0 only INTEGER :: ke, nk ! actual last level, number of levels imype = indx (p_pe, gl_dc) seta = gl_dc(imype)% set_a SELECT CASE (sign) 2150: ! ! Legendre space -> spectral space ! CASE (1) ! DO i = 1, p_nprocs 2155: DO i = dc%spe, dc%epe IF (gl_dc(i)% set_a /= seta) CYCLE IF (i == imype) THEN ! DO j = 1,p_nprocs DO j = dc%spe, dc%epe 2160: pe = gl_dc(j)% pe IF (gl_dc(j)% set_a /= seta) CYCLE CALL alloc_ls_sp_buf (ls_i=imype, sp_i=j) CALL pack_ls_buf IF (pe /= p_pe) THEN 2165: CALL p_send ( buf, pe, tag_tr_ls_sp) IF (m0) CALL p_send ( bu0, pe, tag_tr_ls_sp) ELSE CALL unpack_buf_sp ENDIF 2170: DEALLOCATE (buf) IF (m0) DEALLOCATE (bu0) END DO ELSE pe = gl_dc(i)% pe 2175: CALL alloc_ls_sp_buf (ls_i=i, sp_i=imype) CALL p_recv ( buf, pe, tag_tr_ls_sp) IF (m0) CALL p_recv ( bu0, pe, tag_tr_ls_sp) CALL unpack_buf_sp DEALLOCATE (buf) 2180: IF (m0) DEALLOCATE (bu0) ENDIF END DO CASE (-1) ! 2185: ! Legendre space <- spectral space ! ! DO i = 1, p_nprocs DO i = dc%spe, dc%epe IF (gl_dc(i)% set_a /= seta) CYCLE 2190: IF (i == imype) THEN ! DO j = 1, p_nprocs DO j = dc%spe, dc%epe pe = gl_dc(j)% pe IF (gl_dc(j)% set_a /= seta) CYCLE 2195: CALL alloc_ls_sp_buf (ls_i=j, sp_i=imype) CALL pack_sp_buf IF (pe /= p_pe) THEN CALL p_send ( buf, pe, tag_tr_ls_sp) IF(m0) CALL p_send ( bu0, pe, tag_tr_ls_sp) 2200: ELSE CALL unpack_buf_ls ENDIF DEALLOCATE (buf) IF (m0) DEALLOCATE (bu0) 2205: END DO ELSE pe = gl_dc(i)% pe CALL alloc_ls_sp_buf (ls_i=imype, sp_i=i) CALL p_recv ( buf, pe, tag_tr_ls_sp) 2210: IF(m0) CALL p_recv ( bu0, pe, tag_tr_ls_sp) CALL unpack_buf_ls DEALLOCATE (buf) IF (m0) DEALLOCATE (bu0) ENDIF 2215: END DO CASE default CALL finish ('tr_ls_sp','invalid SIGN parameter (not 1,-1)') END SELECT CONTAINS 2220: !------------------------------------------------------------------------------ SUBROUTINE alloc_ls_sp_buf (ls_i, sp_i) INTEGER :: ls_i, sp_i ! Legendre and spectral space pe indices ! ! derive bounds and allocate buffer 2225: ! nllevp1 = gl_dc(ls_i)% nllevp1! number of levels in Legendre space llevs = gl_dc(ls_i)% llevs ! first level in Legendre space lleve = gl_dc(ls_i)% lleve ! last level in Legendre space nlnm0 = gl_dc(ls_i)% nlnm0 ! number of coeff. with m=0 in Legendre space 2230: snsp = gl_dc(sp_i)% snsp ! number of coefficients in sp. space ssps = gl_dc(sp_i)% ssps ! first coefficients in spectral space sspe = gl_dc(sp_i)% sspe ! last coefficients in spectral space nsnm0 = gl_dc(sp_i)% nsnm0 ! number of coeff. with m=0 in spectral space m0 = PRESENT(ls0) .AND. PRESENT(sp0) .AND. nlnm0>0 .AND. nsnm0>0 2235: IF(m0) snn0 = gl_dc(sp_i)% snn0(1) ALLOCATE (buf (nllevp1, 2, snsp, 3)) 2240: IF (m0) THEN ALLOCATE (bu0 (nllevp1, nsnm0)) ENDIF END SUBROUTINE alloc_ls_sp_buf 2245: !------------------------------------------------------------------------------ SUBROUTINE pack_ls_buf buf (:SIZE(ls1,1),:,:,1) = ls1 (:,:,ssps:sspe) buf (:SIZE(ls2,1),:,:,2) = ls2 (:,:,ssps:sspe) buf (:SIZE(ls3,1),:,:,3) = ls3 (:,:,ssps:sspe) 2250: IF (m0) bu0 (:SIZE(ls0,1),:) = ls0 (:,snn0+1:snn0+nsnm0) END SUBROUTINE pack_ls_buf !------------------------------------------------------------------------------ SUBROUTINE unpack_buf_sp ke = MIN(SIZE(sp1,1), lleve); nk = ke-llevs+1 2255: IF(nk>0) sp1 (llevs:ke, :, :) = buf (:nk,:,:,1) ke = MIN(SIZE(sp2,1), lleve); nk = ke-llevs+1 IF(nk>0) sp2 (llevs:ke, :, :) = buf (:nk,:,:,2) ke = MIN(SIZE(sp3,1), lleve); nk = ke-llevs+1 IF(nk>0) sp3 (llevs:ke, :, :) = buf (:nk,:,:,3) 2260: IF (m0) THEN ke = MIN(SIZE(sp0,1), lleve); nk = ke-llevs+1 IF(nk>0) sp0 (llevs:ke,:) = bu0 (:nk,:) ENDIF END SUBROUTINE unpack_buf_sp 2265: !------------------------------------------------------------------------------ SUBROUTINE pack_sp_buf ke = MIN(SIZE(sp1,1), lleve); nk = MAX(0, ke-llevs+1) buf (:nk,:,:,1) = sp1 (llevs:ke, :, :); buf(nk+1:,:,:,1) = 0. ke = MIN(SIZE(sp2,1), lleve); nk = MAX(0, ke-llevs+1) 2270: buf (:nk,:,:,2) = sp2 (llevs:ke, :, :); buf(nk+1:,:,:,2) = 0. ke = MIN(SIZE(sp3,1), lleve); nk = MAX(0, ke-llevs+1) buf (:nk,:,:,3) = sp3 (llevs:ke, :, :); buf(nk+1:,:,:,3) = 0. IF (m0) THEN ke = MIN(SIZE(sp0,1), lleve); nk = MAX(0, ke-llevs+1) 2275: bu0 (:nk,:) = sp0 (llevs:ke,:); bu0 (nk+1:,:) = 0. ENDIF END SUBROUTINE pack_sp_buf !------------------------------------------------------------------------------ SUBROUTINE unpack_buf_ls 2280: ls1 (:,:,ssps:sspe) = buf (:SIZE(ls1,1),:,:,1) ls2 (:,:,ssps:sspe) = buf (:SIZE(ls2,1),:,:,2) ls3 (:,:,ssps:sspe) = buf (:SIZE(ls3,1),:,:,3) IF (m0) THEN ls0 (:,snn0+1:snn0+nsnm0) = bu0 (:SIZE(ls0,1),:) 2285: ENDIF END SUBROUTINE unpack_buf_ls !------------------------------------------------------------------------------ END SUBROUTINE tr_ls_sp !============================================================================== 2290: SUBROUTINE trace (c,pe) ! ! routine to call for debugging printout ! CHARACTER :: c 2295: INTEGER :: pe WRITE(nerr,'(a,a,i3.3,a)') REPEAT('| ',p_pe),c,pe,& REPEAT('| ',p_nprocs-p_pe-1) END SUBROUTINE trace !============================================================================== 2300: FUNCTION indx0 (pe, gl_dc) INTEGER ,INTENT(in) :: pe ! processor id TYPE (pe_decomposed) ,INTENT(in) :: gl_dc(:) ! global decomposition INTEGER :: indx0 ! index ! 2305: ! returns the index of a given PE in the global decomposition table ! INTEGER :: i DO i = 1, SIZE(gl_dc) IF(gl_dc(i)% pe == pe) THEN 2310: indx0 = i RETURN ENDIF END DO WRITE (nerr,*) 'mo_transpose:indx - index not found in decomposition table' 2315: WRITE (nerr,*) ' reqired:',pe WRITE (nerr,*) ' found :',gl_dc% pe CALL finish ('mo_transpose:indx','index not found in decomposition table') END FUNCTION indx0 !------------------------------------------------------------------------------ 2320: FUNCTION indx2 (pe, gl_dc) INTEGER ,INTENT(in) :: pe(:,:) ! processor id TYPE (pe_decomposed) ,INTENT(in) :: gl_dc(:) ! global decomposition INTEGER :: indx2(SIZE(pe,1),SIZE(pe,2)) ! index ! 2325: ! returns the index of a given PE in the global decomposition table ! INTEGER :: i indx2 = -1 DO i = 1, SIZE(gl_dc) 2330: WHERE(gl_dc(i)% pe == pe) indx2 = i endwhere END DO IF(ANY(indx2==-1))THEN 2335: WRITE(nerr,*)'mo_transpose:indx - index not found in decomposition table' WRITE(nerr,*)' reqired:',PACK(pe,mask=(indx2==-1)) WRITE(nerr,*)' found :',gl_dc% pe CALL finish('mo_transpose:indx','index not found in decomposition table') END IF 2340: END FUNCTION indx2 !============================================================================== END MODULE mo_transposeback to top
Info Section uses: mo_decomposition, mo_doctor, mo_exception, mo_mpi calls: alloc_fs_ls_buf, alloc_gp_fs_buf, alloc_ls_sp_buf, finish, gather_gp2 gather_gp3, gather_gp32, gather_sa2, gather_sa4, gather_sp0 gather_sp3, p_bcast, p_recv, p_send, pack_fs_buf pack_gp_buf, pack_ls_buf, pack_sp_buf, scatter_gp2, scatter_gp3 scatter_gp32, scatter_sa2, scatter_sa4, scatter_sp0, scatter_sp3 unpack_buf_fs, unpack_buf_gp, unpack_buf_ls, unpack_buf_sp
HTML derived from FORTRAN source by f2html.pl v0.3 (C) 1997,98 Beroud Jean-Marc.